﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using ConsoleApplication1.Cluster.Entity;
using OF.DistributeService.Core.Common;
using OF.Notify.Common;
using OF.Notify.DataHost.Cluster.Disk;
using OF.Notify.DataHost.Cluster.Entity;
using OF.Notify.Entity;
using OF.Notify.Master;
using OF.Notify.Test;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace OF.Notify.DataHost.Cluster
{
    public class TopicDataNode
    {
		public const int MaxNodesCount = 102400;
	
        internal byte topicEnum;
        internal DataNode dataNode;

        public int lastClusterId = ClusterContext.InvalidateId;
        public IIdToBigValuesMapper currentNodeClusterMapper = null;
        public IIdToSmallValuesMapper clusterNodeMapper = null;
        public ClusterNodeChangeSendCache clusterChangeSendCache = null;
        public IIdToBigValuesMapper clusterCollectionMapper = null;
        public DiskCollectionOnlineMessageStore _activeCollectionStore = null;
		internal DiskCollectionSyncMessageStore syncCollectionStore = null;
		
		public string LockActiveCollectionStoreKey;
		
		internal TopicClusterContext topicContext;
        internal ConsumerInstanceProvider<ConsumerTopicDataNode> consumerTopicDataProvider = null;
        
        public TopicDataNode(DataNode dataNode, byte topicEnum)
        {
            short id = dataNode.Id;
            this.dataNode = dataNode;
            this.topicEnum = topicEnum;
			LockActiveCollectionStoreKey = string.Format("`LACS`{0}", (byte)topicEnum);
			var context = ClusterContext.Get();
            topicContext = context.GetTopicContext(topicEnum);
            clusterChangeSendCache = new ClusterNodeChangeSendCache(GetDiskOfflineMaperFunc);
            Dictionary<byte, ConsumerTopicDataNode> consumerTopicDataNodeDict = new Dictionary<byte, ConsumerTopicDataNode>();
            foreach (var consumerEnum in topicContext.GetAllConsumerEnums())
            {
                consumerTopicDataNodeDict.Add(consumerEnum, new ConsumerTopicDataNode(topicEnum, consumerEnum, topicContext, dataNode, this));
            }
            consumerTopicDataProvider = new ConsumerInstanceProvider<ConsumerTopicDataNode>(consumerTopicDataNodeDict);
            clusterNodeMapper = new DiskIdToSmallValuesMapper(topicContext.GetClusterMapNodeStorageBasePath(dataNode.Id),
                (clusterId) => topicContext.GetClusterMapNodeStoragePath(id, clusterId));
            currentNodeClusterMapper = new DiskIdToBigValuesMapper(topicContext, (clusterId) =>
                topicContext.GetCurrentNodeClusterStoragePath(id, clusterId));
            clusterCollectionMapper = new DiskIdToBigValuesMapper(topicContext, (clusterId) =>
                topicContext.GetCollectionStoragePath(id, clusterId));
			syncCollectionStore = new DiskCollectionSyncMessageStore(topicEnum);
        }

		public void StartSyncThread()
		{
			Thread th = new Thread(new ThreadStart(SyncClusterCollection));
			th.Start();
		}

        public void Dump()
        {
            StringBuilder sb = new StringBuilder();
            try
            {                
                sb.AppendLine(string.Format(topicEnum.ToString() + " Dump Node Id:{0}, No:{1}, ClusterId:{2}", dataNode.Id, dataNode.sequenceNo, lastClusterId));
                if (currentNodeClusterMapper == null)
                {
                    return;
                }
                var clusters = currentNodeClusterMapper.GetTop(dataNode.Id);
                if (clusters == null)
                {
                    return;
                }
                sb.AppendLine(string.Format("nodeClusters:{0}", SafeStringJoin(" ", clusters)));
                sb.AppendLine(string.Format("clusterNodeMapper:{0}", SafeStringJoin(Environment.NewLine, clusters.Select(clusterId => string.Format("clusterid:{0}-> nodes:{1}", clusterId, SafeStringJoin(" ", clusterNodeMapper.GetValues(clusterId).Where(item => item != null).Select(item => item.Value)))))));
                sb.AppendLine(string.Format("clusterCollectionMapper:{0}", SafeStringJoin(Environment.NewLine, clusters.Select(clusterId => string.Format("clusterId:{0}, collets:{1}", clusterId, SafeStringJoin(" ", clusterCollectionMapper.GetTop(clusterId) == null ? new List<int>() : clusterCollectionMapper.GetTop(clusterId)))))));
                sb.AppendLine(string.Format("GetOnlineNodeIdList():{0}", SafeStringJoin(" ", dataNode.GetOnlineNodeIdList())));
                //DumpMessages(clusters);
                sb.AppendLine("");
                sb.AppendLine("");
            }
            finally
            {
                Util.LogInfo(sb.ToString());
            }
        }

        public NodeRuntimeData GetRuntimeData()
        {
            if (currentNodeClusterMapper == null)
            {
                return null;
            }
            NodeRuntimeData result = new NodeRuntimeData();
            result.NodeId = dataNode.Id;
            result.ClusterIds = currentNodeClusterMapper.GetTop(dataNode.Id);
            if (result.ClusterIds != null)
            {
                result.ClusterIds = result.ClusterIds.Distinct().ToList();
                result.clusterNodeMap = new Dictionary<int, List<short>>();
                result.clusterCollectionMap = new Dictionary<int, List<int>>();
                foreach (var clusterId in result.ClusterIds)
                {
                    List<short> nodes = clusterNodeMapper.GetValues(clusterId).Where(item => item != null && item.Status == SmallValue16AndFlagStatus.Synced).Select(item => item.Value).ToList();
                    if (nodes != null)
                    {
                        result.clusterNodeMap.Add(clusterId, nodes);
                    }

                    List<int> collections = clusterCollectionMapper.GetTop(clusterId);
                    if (collections != null)
                    {
                        result.clusterCollectionMap.Add(clusterId, collections);
                    }
                }
            }
            result.OnlineNodes = dataNode.GetOnlineNodeIdList();
            return result;
        }

        internal bool IsCollectionIdOnActive(int collectionId, out DiskCollectionOnlineMessageStore currentStore, Action activeAction = null)
        {
            bool result = false;
            DiskCollectionOnlineMessageStore tempActiveCollection = null;
            ClusterContext.StringLock.Invoke(LockActiveCollectionStoreKey, () =>
            {
                tempActiveCollection = _activeCollectionStore;
                result = (tempActiveCollection != null && tempActiveCollection.GetCollectionId() == collectionId);
                if (activeAction != null)
                {
                    if (result)
                    {
                        activeAction();
                    }
                }
            });
            currentStore = tempActiveCollection;
            return result;
        }

        internal List<TopicMessage> GetClusterMessageList(int collectionId)
        {
            return GetClusterMessageListCore(collectionId);
        }

        internal void DeleteCollectionIfAllConsumed(int collectionId)
        {
            foreach (var consumerEnum in topicContext.GetAllConsumerEnums())
            {
                if (!this.consumerTopicDataProvider.GetInstance(consumerEnum).IsCollectionProcessed(collectionId))
                {
                    return;
                }
            }
            DiskCollectionOnlineMessageStore.Delete(topicEnum, dataNode.Id, collectionId);
        }

        internal List<TopicMessage> GetClusterMessageListCore(int collectionId)
        {
            DiskCollectionOnlineMessageStore currentStore = null;
            List<TopicMessage> messageList = null;
            if (IsCollectionIdOnActive(collectionId, out currentStore))
            {
                messageList = currentStore.GetOnLineMessageList();
            }
            else
            {
                try
                {
                    messageList = DiskCollectionOnlineMessageStore.GetMessages(topicEnum, dataNode.Id, collectionId);
                }
                catch (Exception ex)
                {
                    Util.LogInfo("get failed collectionId is:" + collectionId + "," + ((currentStore == null) ? -1 : currentStore.GetCollectionId()));
                    throw;
                }
            }
            return messageList;
        }

        internal void DumpMessages(List<int> clusters)
        {
            foreach (var clusterId in clusters)
            {
                var collections = clusterCollectionMapper.GetTop(clusterId);
                if (collections == null || collections.Count == 0)
                {
                    continue;
                }
                foreach (var collectionId in collections)
                {
                    List<TopicMessage> messageList = GetClusterMessageList(collectionId);
                    if (messageList != null)
                    {
                        Util.LogInfo("Dump Messages 1, collectionId:" + collectionId + ", " + string.Join(" ", messageList.Select(message => BitConverter.ToInt32(message.body, 0))));
                    }
                }
            }
        }
        
        public CallServiceResult<SendGetUnProcessedFinishedCollectionsDTO> SendGetUnProcessedFinishedCollections(SendGetUnProcessedFinishedCollectionsRequest request)
        {
            List<int> unProcessedCollections = request.unProcessedCollections;
            List<int> finishedCollectionList = new List<int>();
            foreach (var collectionId in unProcessedCollections)
            {
                DiskCollectionOnlineMessageStore currentStore = null;
                if (IsCollectionIdOnActive(collectionId, out currentStore))
                {
                }
                else if (DiskCollectionOnlineMessageStore.Contains(topicEnum, dataNode.Id, collectionId))
                {
                    finishedCollectionList.Add(collectionId);
                }
            }
            var consumerData = this.consumerTopicDataProvider.GetInstance(request.consumerEnum);
            List<int> processingCollectionList = consumerData.GetProcessingCollectionList(request);
            List<int> processedCollectionList = consumerData.GetProcessedCollectionList(request);
            return CallServiceResult<SendGetUnProcessedFinishedCollectionsDTO>.GetSuccess(new SendGetUnProcessedFinishedCollectionsDTO {
                finishedCollectionList = finishedCollectionList,
                processingCollectionList = processingCollectionList,
                processedCollectionList = processedCollectionList
            });
        }

        public CallServiceResult<List<int>> SendProcessCollections(SendProcessCollectionsRequest request)
        {
            List<int> collectionList = new List<int>();
            foreach (var collectionId in request.collections)
            {
                if (DiskCollectionOnlineMessageStore.Contains(topicEnum, dataNode.Id, collectionId))
                {
                    collectionList.Add(collectionId);
                }
            }
            List<int> successedList = consumerTopicDataProvider.GetInstance(request.consumerEnum).ConsumeCollections(collectionList);
            return CallServiceResult<List<int>>.GetSuccess(successedList);
        }

        internal DataNodeProxy GetProxy(short nodeId)
        {
            if (dataNode.isDisposed)
            {
                return null;
            }
			var currentDataNodeList = dataNode.GetCurrentDataNodeList();
            if (currentDataNodeList == null)
            {
                return null;
            }

            var nodeP = currentDataNodeList.FirstOrDefault(node => node.GetId() == nodeId);
            return nodeP;
        }

        internal List<DataNodeProxy> GetProxy(List<SmallValue16AndFlag> allClusterNodes)
        {
            if (dataNode.isDisposed)
            {
                return null;
            }
			var currentDataNodeList = dataNode.GetCurrentDataNodeList();
            if (currentDataNodeList == null)
            {
                return null;
            }
            List<DataNodeProxy> clusterOnLineNodes = new List<DataNodeProxy>(allClusterNodes.Count);
            foreach (SmallValue16AndFlag valueFalg in allClusterNodes)
            {
                DataNodeProxy proxy = currentDataNodeList.FirstOrDefault(node => node.GetId() == valueFalg.Value);
                if (proxy == null)
                {
                    continue;
                }
                clusterOnLineNodes.Add(proxy);
            }
            return clusterOnLineNodes;
        }

        internal List<DataNodeProxy> GetSenderMoveToLastList(List<DataNodeProxy> list)
        {
            var thisProxyList = list.Where(proxy => proxy.GetId() == dataNode.Id).ToList();
            if (thisProxyList == null || thisProxyList.Count() <= 0)
            {
                return list;
            }
            else
            {
                list = list.ToList();
                foreach (var thisProxy in thisProxyList)
                {
                    list.Remove(thisProxy);
                }
                list.AddRange(thisProxyList);
                return list;
            }
        }


        public CallServiceResult<bool> MultiSendSaveClusterNodeChangeToNotify(SendSaveClusterNodeChangeToNotifyRequest request)
        {
            var clusterOnLineNodes = GetProxy(request.allClusterNodes);
            if (clusterOnLineNodes != null)
            {
                var context = ClusterContext.Get();
                Parallel.ForEach(GetSenderMoveToLastList(clusterOnLineNodes), ClusterContext.GetParallelOptions(), (toNode) => {
                    Util.SafeLoopUtilTrue((loopI) => {
                        if (dataNode.IsNodeOffLine(toNode.GetId()))
                        {
                            return true;   
                        }
                        return toNode.SendSaveClusterNodeChangeToNotify(request).IsSuccess();
                    }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());
                });
                return CallServiceResult<bool>.GetSuccess(true);
            }
            else
            {
                return CallServiceResult<bool>.GetSuccess(false);
            }
        }

        public CallServiceResult<bool> SendSaveClusterNodeChangeToNotify(SendSaveClusterNodeChangeToNotifyRequest request)
        {
            clusterChangeSendCache.Append(request.clusterOffLineNodes, request.clusterId, request.allClusterNodes);
            return CallServiceResult<bool>.GetSuccess(true);
        }

        public CallServiceResult<Dictionary<byte, IdTreeNode>> GetProcessedTreeForRestore(GetProcessedTreeForRestoreRequest request)
        {
            ResetNodeStatus();
            Dictionary<byte, IdTreeNode> treeNodeDict = new Dictionary<byte, IdTreeNode>();
            foreach (byte consumerEnum in topicContext.GetAllConsumerEnums())
            {
                IdTreeNode processedMergeTree = consumerTopicDataProvider.GetInstance(consumerEnum).processedMergeTree;
                treeNodeDict.Add(consumerEnum, processedMergeTree);
            }
            return CallServiceResult<Dictionary<byte, IdTreeNode>>.GetSuccess(treeNodeDict);
        }


        public CallServiceResult<bool> SendInstantProcessCollection(SendInstantProcessCollectionRequest request)
        {
            int collectionId = request.collectionId;
            DiskCollectionOnlineMessageStore currentStore = null;
            if (IsCollectionIdOnActive(collectionId, out currentStore))
            {
                ConsumerTopicDataNode consumerTopicData = consumerTopicDataProvider.GetInstance(request.consumerEnum);
                bool isTransfered = currentStore.TransferOnlineMessagesToConsumer(consumerTopicData);
                return CallServiceResult<bool>.GetSuccess(isTransfered);
            }
            else
            {
                return CallServiceResult<bool>.GetSuccess(false);
            }
        }           

        public CallServiceResult<bool> SendDisposeOnlineCollection(SendDisposeOnlineCollectionRequest request)
        {
            OnDisposeOnlineCollectionCore(request.collectionId);
            OnDisposeOnlineCollectionCore(request.lastCollectionId);
            return CallServiceResult<bool>.GetSuccess(true);
        }

        public CallServiceResult<bool> SendInstantCollectionFinished(SendInstantCollectionFinishedRequest request)
        {
            if (dataNode.masterNode != null)
            {
                TaskHelper.SafeStartTask(() => {
                    dataNode.masterNode.OnInstantCollectionFinished(request.topicEnum, request.consumerEnum, request.isSuccessed, request.clusterId, request.collectionId);
                });
                return CallServiceResult<bool>.GetSuccess(true);
            }
            else
            {
                return CallServiceResult<bool>.GetError(1, "current node is not master node!");
            }
        }

        internal void OnDisposeOnlineCollectionCore(int collectionId)
        {
            Action disposeAction = () =>
            {
                if (_activeCollectionStore != null)
                {
                    //Util.LogInfo("DoDispose collection:" + _activeCollectionStore.GetCollectionId() + ", on node:" + this.notifyHostId);
                    _activeCollectionStore.DoDispose();
                    _activeCollectionStore = null;
                }
            };
            DiskCollectionOnlineMessageStore currentStore = null;
            IsCollectionIdOnActive(collectionId, out currentStore, disposeAction);
        }

        internal List<SmallValue16AndFlag> MergeClusterNodeMappings(int clusterId, List<SmallValue16AndFlag> oldIdList,
            List<SmallValue16AndFlag> newIdList)
        {
            if (oldIdList == null)
            {
                oldIdList = new List<SmallValue16AndFlag>();
            }
            var allIds = oldIdList.Concat(newIdList).OrderBy(item => item.GetOrderValue());

            SmallValue16AndFlag lastItem = null;
            List<SmallValue16AndFlag> mergedList = new List<SmallValue16AndFlag>(newIdList.Count);
            foreach (var item in allIds)
            {
                if (lastItem == null)
                {
                }
                else
                {
                    if (lastItem.Value == item.Value)
                    {

                    }
                    else
                    {
                        mergedList.Add(lastItem);
                    }
                }
                lastItem = item;
            }
            if (lastItem != null)
            {
                mergedList.Add(lastItem);
            }
            clusterNodeMapper.ResetValues(clusterId, mergedList);
            return mergedList;
        }

        public CallServiceResult<bool> SendGetClusterNodeChangeToNotifyCmd(SendGetClusterNodeChangeToNotifyCmdRequest request)
        {
            TaskHelper.SafeStartTask(() => {
                short nodeId = request.nodeId;
                var context = ClusterContext.Get();
                clusterChangeSendCache.GetClusterSyncNodes(nodeId, (clusterNodesPage) =>
                {
                    Util.SafeLoopUtilTrue((loopI) =>
                    {
                        var proxy = GetProxy(nodeId);
                        if (proxy == null)
                        {
                            return true;
                        }

                        if (dataNode.IsNodeOffLine(nodeId))
                        {
                            return true;
                        }
                        var callResult = proxy.SendGetClusterNodeChangeToNotifyData(new SendGetClusterNodeChangeToNotifyDataRequest
                        {
                            topicEnum = topicEnum,
                            clusterNodesList = clusterNodesPage
                        });
                        if (callResult.IsSuccess())
                        {
                            List<KeyValuePair<int, List<SmallValue16AndFlag>>> changedClusterNodes = callResult.Data;
                            foreach (KeyValuePair<int, List<SmallValue16AndFlag>> kv in changedClusterNodes)
                            {
                                clusterNodeMapper.ResetValues(kv.Key, kv.Value);
                            }
                            return true;
                        }
                        else
                        {
                            return false;
                        }
                    }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());
                });
            });
            return CallServiceResult<bool>.GetSuccess(true);
        }

        internal string SafeStringJoin<T>(string sep, IEnumerable<T> array)
        {
            if (array != null)
            {
                return string.Join(sep, array);
            }
            else
            {
                return string.Empty;
            }
        }

        public CallServiceResult<List<KeyValuePair<int, List<SmallValue16AndFlag>>>> SendGetClusterNodeChangeToNotifyData(SendGetClusterNodeChangeToNotifyDataRequest request)
        {
            var clusterNodesList = request.clusterNodesList;
            List<KeyValuePair<int, List<SmallValue16AndFlag>>> changedClusterNodesList = new List<KeyValuePair<int, List<SmallValue16AndFlag>>>(clusterNodesList.Count);
            foreach (var kv in clusterNodesList)
            {
                List<SmallValue16AndFlag> oldValues = clusterNodeMapper.GetValues(kv.Key);
                List<SmallValue16AndFlag> mergedValues = MergeClusterNodeMappings(kv.Key, oldValues,
                    kv.Value);
                DoDisposeClusterTempData(kv.Key, mergedValues);
                if (HasChanged(oldValues, mergedValues))
                {
                    changedClusterNodesList.Add(new KeyValuePair<int, List<SmallValue16AndFlag>>(kv.Key, mergedValues));
                }
            }
            return CallServiceResult<List<KeyValuePair<int, List<SmallValue16AndFlag>>>>.GetSuccess(changedClusterNodesList);
        }


        public static bool HasChanged(List<SmallValue16AndFlag> oldList, List<SmallValue16AndFlag> newList)
        {
            foreach (var newItem in newList)
            {
                var findOldItem = oldList.FirstOrDefault(old => old.Value == newItem.Value && old.Status == newItem.Status);
                if (findOldItem == null)
                {
                    return true;
                }
            }
            return false;
        }

        public CallServiceResult<List<SmallValue16AndFlag>> SendUpdateClusterNodes(SendUpdateClusterNodesRequest request)
        {
            List<SmallValue16AndFlag> allClusterNodes = request.allClusterNodes;
            int clusterId = request.clusterId;
            List<short> syncClusterNodes = request.syncClusterNodes;
            List<SmallValue16AndFlag> mergedResult = null;

            if (request.syncClusterNodes != null && request.syncClusterNodes.Contains(dataNode.Id))
            {
                syncCollectionStore.SafeTouchClusterTempPath(dataNode.Id, clusterId);
            }

            if (!clusterNodeMapper.Exists(clusterId))
            {                
                currentNodeClusterMapper.AppendValue(dataNode.Id, clusterId);
                mergedResult = MergeClusterNodeMappings(clusterId, null, allClusterNodes);
            }
            else
            {
                mergedResult = MergeClusterNodeMappings(clusterId, clusterNodeMapper.GetValues(clusterId), allClusterNodes);
            }
            //Util.LogInfo("....update cluster:" + clusterId + " on node:" + dataNode.Id + ", exists:" + clusterNodeMapper.Exists(clusterId) + "," + string.Join(" ", mergedResult.Select(node => node.Value + ":" + node.Status)));
            DoDisposeClusterTempData(clusterId, mergedResult);
            return CallServiceResult<List<SmallValue16AndFlag>>.GetSuccess(mergedResult);
        }

        internal void DoDisposeClusterTempData(int clusterId, List<SmallValue16AndFlag> allClusterNodes)
        {
            var nodeFinishCluster = allClusterNodes.FirstOrDefault(nodeFlag => nodeFlag.Value == dataNode.Id && (nodeFlag.Status == SmallValue16AndFlagStatus.Synced
                || nodeFlag.Status == SmallValue16AndFlagStatus.Deleted));
            if (nodeFinishCluster != null)
            {
                syncCollectionStore.SafeDeleteClusterTempPath(dataNode.Id, clusterId);
            }
        }

        public CallServiceResult<bool> SendSyncCollectionData(SendSyncCollectionDataRequest request)
        {
            clusterCollectionMapper.AppendValue(request.clusterId, request.collectionList.Select(item => item.CollectionId).ToList());
            foreach (var collectionMessage in request.collectionList)
            {
                syncCollectionStore.SaveSyncMessages(dataNode.Id, request.clusterId, collectionMessage.CollectionId,
                    collectionMessage.MessageList);
            }
            return CallServiceResult<bool>.GetSuccess(true);
        }

        private bool ExistsOnlineSyncNode(SendSyncCollectionCmdRequest request)
        {
            bool existsOnLineNode = false;
            foreach (short toNodeId in request.nodeToSyncList)
            {
                if (!dataNode.IsNodeOffLine(toNodeId))
                {
                    existsOnLineNode = true;
                    break;
                }
            }
            return existsOnLineNode;
        }

        public CallServiceResult<bool> SendSyncCollectionCmd(SendSyncCollectionCmdRequest request)
        {
            var context = ClusterContext.Get();
            int clusterId = request.clusterId;
            HashSet<short> availableNodesSet = new HashSet<short>();
            foreach (short toNodeId in request.nodeToSyncList)
            {
                availableNodesSet.Add(toNodeId);
            }
            bool allNodesOffline = false;
            clusterCollectionMapper.VisitAll(clusterId, (collectionList) =>
            {
                if (allNodesOffline)
                {
                    availableNodesSet.Clear();
                    return true;
                }
                List<DataCollectionMessages> collectionMessages = new List<DataCollectionMessages>(collectionList.Count);
                foreach (int collectionId in collectionList)
                {
                    if (!ExistsOnlineSyncNode(request))
                    {
                        allNodesOffline = true;
                        availableNodesSet.Clear();
                        return true;
                    }
                    List<TopicMessage> messageList = GetClusterMessageList(collectionId);
                    if (messageList == null)
                    {
                        continue;
                    }
                    collectionMessages.Add(new DataCollectionMessages
                    {
                        CollectionId = collectionId,
                        MessageList = messageList
                    });
                }

                Parallel.ForEach(request.nodeToSyncList, ClusterContext.GetParallelOptions(), (toNodeId) =>
                {
                    if (allNodesOffline)
                    {
                        availableNodesSet.Clear();
                        return;
                    }
                    if (!ExistsOnlineSyncNode(request))
                    {
                        allNodesOffline = true;
                        availableNodesSet.Clear();
                        return;
                    }

                    if (dataNode.IsNodeOffLine(toNodeId))
                    {
                        RemoveSyncUnFinishCluster(toNodeId, clusterId);
                        availableNodesSet.Remove(toNodeId);
                        return;
                    }
                    var node = GetProxy(toNodeId);
                    if (node != null)
                    {
                        node.SendSyncCollectionData(new SendSyncCollectionDataRequest
                        {
                            topicEnum = topicEnum,
                            clusterId = clusterId,
                            fromNodeId = dataNode.Id,
                            collectionList = collectionMessages
                        });
                    }
                });
                if (allNodesOffline)
                {
                    return true;
                }
                else
                {
                    return false;
                }
            });
            if (!allNodesOffline)
            {
                Parallel.ForEach(availableNodesSet, ClusterContext.GetParallelOptions(), (toNodeId) =>
                {
                    RaiseSyncClusterFinished(clusterId, toNodeId);
                });
            }
            return CallServiceResult<bool>.GetSuccess(true);
        }

        internal void SyncClusterCollection(List<int> clusterList)
        {
            List<int> failedList = SyncClusterCollectionCore(clusterList);
            if (failedList != null && failedList.Count > 0)
            {
                Thread.Sleep(20);
                failedList = SyncClusterCollectionCore(failedList);
                if (failedList != null && failedList.Count > 0)
                {
                    if (dataNode.isDisposed)
                    {
                        return;
                    }
                    foreach (var clusterId in failedList)
                    {
                        Util.LogInfo("error clusterId can't found in node:" + dataNode.Id + ",clusterId:" + clusterId);
                    }
                }
            }
        }

        internal void RaiseSyncClusterFinished(int clusterId, short syncToNodeId)
        {
            List<SmallValue16AndFlag> clusterNodeList = clusterNodeMapper.GetValues(clusterId);
            if (clusterNodeList == null)
            {
                clusterNodeList = new List<SmallValue16AndFlag>();
            }
            List<SmallValue16AndFlag> syncToNodeList = clusterNodeList.Where(clusterNode => clusterNode.Value == syncToNodeId).ToList();
            if (syncToNodeList != null && syncToNodeList.Count > 0)
            {
                foreach (var syncToNode in syncToNodeList)
                {
                    syncToNode.Status = SmallValue16AndFlagStatus.Synced;
                }
            }
            else
            {
                clusterNodeList.Add(new SmallValue16AndFlag
                {
                    Value = syncToNodeId,
                    Status = SmallValue16AndFlagStatus.Synced
                });
            }
            EnsureUpdateClusterAllNodes(clusterId, clusterNodeList);
        }

        internal void EnsureUpdateClusterAllNodes(int clusterId, List<SmallValue16AndFlag> clusterNodeList)
        {
            var clusterNodeIdList = clusterNodeList.Select(nodeFlag => nodeFlag.Value).ToList().Distinct();
			var onlineNodes = dataNode.GetOnlineNodeIdList();
            List<short> clusterOffLineNodes = clusterNodeIdList.Where(clusterNodeId => !onlineNodes.Contains(clusterNodeId)).ToList();
            MultiSendUpdateClusterNodes(clusterNodeList, clusterId, null); 
            MultiSendSaveClusterNodeChangeToNotify(new SendSaveClusterNodeChangeToNotifyRequest
            {
                allClusterNodes = clusterNodeList,
                clusterId = clusterId,
                clusterOffLineNodes = clusterOffLineNodes,
                topicEnum = topicEnum
            });
        }

        public bool MultiSendUpdateClusterNodes(List<SmallValue16AndFlag> allClusterNodes, int clusterId, List<short> syncClusterNodes)
        {
            bool isChanged = false;
            Func<bool> doSendUpdate = () =>
            {
                var clusterOnLineNodes = GetProxy(allClusterNodes);
                if (syncClusterNodes != null)
                {
                    if (syncClusterNodes.Any(nodeId => clusterOnLineNodes!= null && !clusterOnLineNodes.Any(node => node.GetId() == nodeId)))
                    {
                        return false;
                    }
                }
                if (clusterOnLineNodes != null)
                {
                    var context = ClusterContext.Get();
                    Parallel.ForEach(GetSenderMoveToLastList(clusterOnLineNodes), ClusterContext.GetParallelOptions(), (node) => {
                        Util.SafeLoopUtilTrue((loopI) => {
                            if (dataNode.IsNodeOffLine(node.id))
                            {
                                return true;
                            }
                            var callResult = node.SendUpdateClusterNodes(new SendUpdateClusterNodesRequest
                            {
                                topicEnum = topicEnum,
                                allClusterNodes = allClusterNodes,
                                clusterId = clusterId,
                                syncClusterNodes = syncClusterNodes
                            });
                            if (callResult.IsSuccess())
                            {
                                var newList = callResult.Data;
                                if (HasChanged(allClusterNodes, newList))
                                {
                                    lock (clusterOnLineNodes)
                                    {
                                        if (HasChanged(allClusterNodes, newList))
                                        {
                                            allClusterNodes = newList;
                                            isChanged = true;
                                        }
                                    }
                                }
                                return true;
                            }
                            else
                            {
                                return false;
                            }
                        }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());                        
                    });
                }
                return true;
            };
            if (!doSendUpdate())
            {
                return false;
            }
            if (isChanged)
            {
                if (!doSendUpdate())
                {
                    return false;
                }
            }
            return true;
        }

        internal List<int> SyncClusterCollectionCore(List<int> clusterList)
        {
            var context = ClusterContext.Get();
            List<int> failedList = new List<int>();
            foreach (int clusterId in clusterList)
            {
                if (dataNode.isDisposed)
                {
                    //MockTestCluster0(clusterId, -1);
                    return null;
                }
                List<SmallValue16AndFlag> clusterNodeList = clusterNodeMapper.GetValues(clusterId);
                if (clusterNodeList == null || clusterNodeList.Count == 0)
                {
                    //MockTestCluster0(clusterId, -2);
                    failedList.Add(clusterId);
                    continue;
                }
                clusterNodeList = clusterNodeList.Where(clusterNode => clusterNode.Status != SmallValue16AndFlagStatus.Deleted).ToList();
                if (clusterNodeList == null || clusterNodeList.Count == 0)
                {
                    continue;
                }
				var onlineNodes = dataNode.GetOnlineNodeIdList();
                var onLineClusterNodes = clusterNodeList.Where(clusterNode => onlineNodes.Contains(clusterNode.Value)).ToList();
                if (onLineClusterNodes.Count == 0)
                {
                    continue;
                }
                if (onLineClusterNodes.Count >= context.RongYu)
                {
                    //MockTestCluster0(clusterId, 3);
                    continue;
                }
                if (onLineClusterNodes.First().Value != dataNode.Id)
                {
                    //MockTestCluster0(clusterId, 4);
                    continue;
                }

                var availableOnLineClusterNodes = onLineClusterNodes.Where(nodeFlag => nodeFlag.Status == SmallValue16AndFlagStatus.Synced).ToList();
                if (availableOnLineClusterNodes.Count <= 0)
                {
                    //MockTestCluster0(clusterId, 5);
                    continue;
                }

                int toSyncCount = Math.Min(context.RongYu, onlineNodes.Count) - onLineClusterNodes.Count;
                if (toSyncCount <= 0)
                {
                    //MockTestCluster0(clusterId, 6);
                    continue;
                }

                var clusterNodeIdList = clusterNodeList.Select(nodeFlag => nodeFlag.Value).ToList();
                List<short> clusterOffLineNodes = clusterNodeIdList.Where(clusterNodeId => !onlineNodes.Contains(clusterNodeId)).ToList();
                int maxOffLineNodeId = clusterOffLineNodes.Max();
                List<Int16> toSyncList = onlineNodes.Where(onlineNodeId => onLineClusterNodes.FirstOrDefault(clusterNode => clusterNode.Value == onlineNodeId) == null)
                    .OrderBy(item => item > maxOffLineNodeId ? item - maxOffLineNodeId : maxOffLineNodeId - item + MaxNodesCount).Take(toSyncCount).ToList();
                IEnumerable<SmallValue16AndFlag> toSyncValueFlag = toSyncList.Select(id => new SmallValue16AndFlag { Value = id, Status = SmallValue16AndFlagStatus.Syncing });
                List<SmallValue16AndFlag> allClusterNodes = toSyncValueFlag.Concat(clusterNodeList).ToList();
                if (!MultiSendUpdateClusterNodes(allClusterNodes, clusterId, toSyncList))
                {
                    //MockTestCluster0(clusterId, 9);
                    continue;
                }
                MultiSendSaveClusterNodeChangeToNotify(new SendSaveClusterNodeChangeToNotifyRequest
                {
                    allClusterNodes = allClusterNodes,
                    clusterOffLineNodes = clusterOffLineNodes,
                    clusterId = clusterId,
                    topicEnum = topicEnum
                });

                //MockTestCluster0(clusterId, 7);
                int toSyncEach = (int)Math.Ceiling((decimal)toSyncCount / availableOnLineClusterNodes.Count);
                foreach (var nodeFlag in availableOnLineClusterNodes)
                {
                    if (toSyncList == null || toSyncList.Count == 0)
                    {
                        break;
                    }

                    List<short> nodeToSyncList = null;
                    if (toSyncList.Count > toSyncEach)
                    {
                        nodeToSyncList = toSyncList.Take(toSyncEach).ToList();
                        toSyncList.RemoveRange(0, toSyncEach);
                    }
                    else
                    {
                        nodeToSyncList = toSyncList;
                        toSyncList = null;
                    }
                    var node = GetProxy(nodeFlag.Value);
                    if (dataNode.isDisposed)
                    {
                        failedList.Add(clusterId);
                        continue;
                    }
                    if (node != null)
                    {
                        node.SendSyncCollectionCmd(new SendSyncCollectionCmdRequest { topicEnum = topicEnum, 
                            clusterId = clusterId, nodeToSyncList = nodeToSyncList });
                        Thread.Sleep(1000);
                        //MockTestCluster0(clusterId, 8);
                    }
                }
            }
            return failedList;
        }

        internal void SyncClusterCollection()
        {
            try
            {
                if (dataNode.isDisposed)
                {
                    return;
                }
                var context = ClusterContext.Get();
                int startUpInterval = context.GetStartupWaitSyncInterval();
                for (int i1 = 0; i1 < startUpInterval; i1++)
                {
                    Thread.Sleep(1000);
                }
                RemoveAllSyncUnfinishedClusters();

                while (true)
                {
                    if (dataNode.isDisposed)
                    {
                        return;
                    }
                    try
                    {
						var onlineNodes = dataNode.GetOnlineNodeIdList();
                        if (onlineNodes != null && onlineNodes.Count > 1)
                        {
                            currentNodeClusterMapper.VisitAll(dataNode.Id, (clusterList) =>
                            {
                                if (dataNode.isDisposed)
                                {
                                    return true;
                                }
                                try
                                {
                                    SyncClusterCollection(clusterList);
                                }
                                catch (Exception ex)
                                {
                                    Util.LogInfo(ex.ToString());
                                    if (dataNode.isDisposed)
                                    {
                                        return true;
                                    }
                                }
                                return false;
                            });
                        }
                    }
                    catch (Exception ex)
                    {
                        Util.LogInfo(ex.ToString());
                        if (dataNode.isDisposed)
                        {
                            return;
                        }
                    }
                    Thread.Sleep(50);
                }
            }
            catch (Exception ex)
            {
                Util.LogException("SyncClusterCollection", ex);
                if (dataNode.isDisposed)
                {
                    return;
                }
                throw;
            }
        }

        internal IIdToSmallValuesMapper GetDiskOfflineMaperFunc(int offlineNodeId)
        {
            var nodeId = dataNode.Id;
            var context = ClusterContext.Get();
            return new DiskIdToSmallValuesMapper(topicContext.GetOfflineNodeChangeStorageBasePath(nodeId, offlineNodeId),
                (clusterId) => topicContext.GetOfflineNodeChangeStoragePath(nodeId, offlineNodeId, clusterId));
        }

        public void ResetNodeStatus()
        {
            ClusterContext.StringLock.Invoke(LockActiveCollectionStoreKey, () =>
            {
                if (_activeCollectionStore != null)
                {
                    //Util.LogInfo("DoDispose collection:" + _activeCollectionStore.GetCollectionId() + ", on node:" + this.notifyHostId);
                    _activeCollectionStore.DoDispose();
                    _activeCollectionStore = null;
                }
            });
        }

        internal void RemoveSyncUnFinishCluster(short nodeId, int clusterId)
        {
            List<SmallValue16AndFlag> clusterNodeList = clusterNodeMapper.GetValues(clusterId);
            if (clusterNodeList == null)
            {
                clusterNodeList = new List<SmallValue16AndFlag>();
            }
            List<SmallValue16AndFlag> toRemoveNodeList = clusterNodeList.Where(clusterNode => clusterNode.Value == nodeId).ToList();
            if (toRemoveNodeList != null && toRemoveNodeList.Count > 0)
            {
                if (toRemoveNodeList.Any(node => node.Status == SmallValue16AndFlagStatus.Synced
                    || node.Status == SmallValue16AndFlagStatus.Deleted))
                {
                }
                else
                {
                    clusterNodeList.Add(new SmallValue16AndFlag
                    {
                        Value = nodeId,
                        Status = SmallValue16AndFlagStatus.Deleted
                    });
                }
            }
            else
            {
                clusterNodeList.Add(new SmallValue16AndFlag
                {
                    Value = nodeId,
                    Status = SmallValue16AndFlagStatus.Deleted
                });
            }
            EnsureUpdateClusterAllNodes(clusterId, clusterNodeList);
        }

        internal void RemoveAllSyncUnfinishedClusters()
        {
            List<int> unfinishedClusters = syncCollectionStore.GetAllUnfinishedSyncClusterIds(dataNode.Id);
            if (unfinishedClusters != null)
            {
                Util.LogInfo("........RemoveAllSyncUnfinishedClusters on " + dataNode.Id + ":" + string.Join(" ", unfinishedClusters));
                foreach (var clusterId in unfinishedClusters)
                {
                    RemoveSyncUnFinishCluster(dataNode.Id, clusterId);
                }
            }
        }

        public CallServiceResult<bool> SendAssignOnLineClusterNodes(SendAssignOnLineClusterNodesRequest request)
        {
            List<short> toNodes = request.toNodes;
            int onLineClusterId = request.onLineClusterId;
            lastClusterId = onLineClusterId;
            currentNodeClusterMapper.AppendValue(dataNode.Id, onLineClusterId);
            List<SmallValue16AndFlag> nodeFlags = toNodes.Select(nodeId => new SmallValue16AndFlag { Value = nodeId, Status = SmallValue16AndFlagStatus.Synced }).ToList();
            clusterNodeMapper.ResetValues(onLineClusterId, nodeFlags);
            return CallServiceResult<bool>.GetSuccess(true);
        }

        public CallServiceResult<bool> SendAssignOnlineCollection(SendAssignOnlineCollectionRequest request)
        {
            int clusterId = request.clusterId;
            int collectionId = request.collectionId;
            ClusterContext.StringLock.Invoke(LockActiveCollectionStoreKey, () =>
            {
                if (_activeCollectionStore != null)
                {
                    //Util.LogInfo("DoDispose collection:" + _activeCollectionStore.GetCollectionId() + ", on node:" + this.notifyHostId);
                    _activeCollectionStore.DoDispose();
                    _activeCollectionStore = null;
                }
                clusterCollectionMapper.AppendValue(clusterId, collectionId);
                DiskCollectionOnlineMessageStore.Touch(topicEnum, dataNode.Id, collectionId);
                _activeCollectionStore = new DiskCollectionOnlineMessageStore(topicEnum, dataNode.Id, clusterId, collectionId);
                _activeCollectionStore.InitSyncedCollection();
                //Util.LogInfo("Do Assign collection:" + _activeCollectionStore.GetCollectionId() + ", on node:" + this.notifyHostId);
            });
            return CallServiceResult<bool>.GetSuccess(true);
        }

        public CallServiceResult<bool> SendMessage(SendMessageRequest request)
        {
            DiskCollectionOnlineMessageStore currentStore = null;
            if (IsCollectionIdOnActive(request.collectionId, out currentStore))
            {
                currentStore.SaveOnlineMessage(request.message);
                return CallServiceResult<bool>.GetSuccess(true);
            }
            else
            {
                return CallServiceResult<bool>.GetSuccess(false);
            }
        }
        
        public CallServiceResult<bool> SendSyncProcessedTree(SendSyncProcessedTreeRequest request)
        {
            this.consumerTopicDataProvider.GetInstance(request.consumerEnum).SendSyncProcessedTree(request);
            return CallServiceResult<bool>.GetSuccess(true);
        }
    }
}